/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */
package org.unidata.mdm.dq.data.service.segments.records.upsert;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.calculables.CalculableHolder;
import org.unidata.mdm.core.type.data.DataRecord;
import org.unidata.mdm.core.type.data.DataShift;
import org.unidata.mdm.core.type.data.OperationType;
import org.unidata.mdm.core.type.timeline.MutableTimeInterval;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.service.impl.RecordComposerComponent;
import org.unidata.mdm.data.type.apply.RecordUpsertChangeSet;
import org.unidata.mdm.data.type.calculables.impl.DataRecordHolder;
import org.unidata.mdm.data.type.data.EtalonRecord;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.data.OriginRecordInfoSection;
import org.unidata.mdm.data.type.data.impl.OriginRecordImpl;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RecordOriginKey;
import org.unidata.mdm.data.util.DataDiffUtils;
import org.unidata.mdm.dq.core.dto.DataQualityResult;
import org.unidata.mdm.dq.core.serialization.DataQualitySerializer;
import org.unidata.mdm.dq.core.type.io.DataQualityError;
import org.unidata.mdm.dq.core.type.io.DataQualitySpot;
import org.unidata.mdm.dq.core.type.model.instance.NamespaceAssignmentElement;
import org.unidata.mdm.dq.data.context.RecordQualityContext;
import org.unidata.mdm.dq.data.module.DataQualityDataModule;
import org.unidata.mdm.dq.data.type.search.RecordsDataQualityHeaderField;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.RecordIndexId;
import org.unidata.mdm.search.configuration.SearchConfigurationConstants;
import org.unidata.mdm.search.context.IndexRequestContext;
import org.unidata.mdm.search.type.id.AbstractManagedIndexId;
import org.unidata.mdm.search.type.indexing.Indexing;
import org.unidata.mdm.search.type.indexing.IndexingField;
import org.unidata.mdm.search.type.indexing.IndexingRecord;
import org.unidata.mdm.system.service.PlatformConfiguration;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;
import org.unidata.mdm.system.type.pipeline.Point;
import org.unidata.mdm.system.type.pipeline.Start;

/**
 * @author Mikhail Mikhailov on Apr 9, 2021<br>
 * DQ processing gate.<br>
 * This class is a working example for decision point.<br>
 * In such a point you can decide what to do with the result (examine and throw or save).<br>
 */
@Component(RecordUpsertQualityGateExecutor.SEGMENT_ID)
public class RecordUpsertQualityGateExecutor extends Point<RecordQualityContext> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataQualityDataModule.MODULE_ID + "[RECORD_UPSERT_QUALITY_GATE]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataQualityDataModule.MODULE_ID + ".record.upsert.quality.gate.description";
    /**
     * PC.
     */
    @Autowired
    private PlatformConfiguration platformConfiguration;
    /**
     * RCC.
     */
    @Autowired
    private RecordComposerComponent recordComposerComponent;
    /**
     * The MMS instance.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * Delay for async audit operations.
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_REFRESH_IMMEDIATE)
    private ConfigurationValue<Boolean> refreshImmediate;
    /**
     * Constructor.
     */
    public RecordUpsertQualityGateExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void point(RecordQualityContext ctx) {

        Map<TimeInterval<OriginRecord>, DataQualityResult> collected = ctx.qualityResult();
        if (MapUtils.isNotEmpty(collected)) {

            Timeline<OriginRecord> next = ctx.nextTimeline();
            RecordKeys keys = next.getKeys();
            RecordUpsertChangeSet cs = ctx.changeSet();
            NamespaceAssignmentElement nae = ctx.getAssignment();

            for (Entry<TimeInterval<OriginRecord>, DataQualityResult> re : collected.entrySet()) {

                TimeInterval<OriginRecord> ti = re.getKey();
                DataQualityResult r = re.getValue();

                if (r.isValid() && !r.isEnriched()) {
                    continue;
                }

                String periodIdAsString = AbstractManagedIndexId.periodIdValToString(ti.getPeriodId());

                processIndex(ctx, r, keys, periodIdAsString, cs);
                processData(ctx, r, keys, periodIdAsString, ti, nae);
            }
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return RecordQualityContext.class.isAssignableFrom(start.getInputTypeClass());
    }
    /*
     * Process indexing updates
     */
    private void processIndex(RecordQualityContext ctx, DataQualityResult dqr, RecordKeys keys, String periodIdAsString, RecordUpsertChangeSet cs) {

        // Has no errors.
        if (dqr.isValid()) {
            return;
        }

        // We index one nested object per rule execution result
        List<IndexingRecord> rules = new ArrayList<>();
        dqr.getResults().forEach((s, results) ->

            results.forEach(rer -> {

                if (!rer.hasErrors() && !rer.hasSpots()) {
                    return;
                }

                List<IndexingField> fields = new ArrayList<>();

                // Set and rule names
                fields.add(IndexingField.of(RecordsDataQualityHeaderField.FIELD_SET_NAME.getName(), rer.getSet().getName()));
                fields.add(IndexingField.of(RecordsDataQualityHeaderField.FIELD_SET_DISPLAY_NAME.getName(), rer.getSet().getName()));
                fields.add(IndexingField.of(RecordsDataQualityHeaderField.FIELD_RULE_NAME.getName(), rer.getRule().getName()));
                fields.add(IndexingField.of(RecordsDataQualityHeaderField.FIELD_RULE_DISPLAY_NAME.getName(), rer.getRule().getName()));

                // Errors
                if (rer.hasErrors()) {

                    List<String> functions = new ArrayList<>(rer.getErrors().size());
                    List<String> messages = new ArrayList<>(rer.getErrors().size());
                    Set<String> severities = new HashSet<>(rer.getErrors().size());
                    List<Long> scores = new ArrayList<>(rer.getErrors().size());
                    Set<String> categories = new HashSet<>(rer.getErrors().size());
                    List<String> errors = new ArrayList<>(rer.getErrors().size());
                    long totalScore = 0;

                    for (DataQualityError error : rer.getErrors()) {

                        functions.add(error.getFunctionName());
                        messages.add(error.getMessage());
                        severities.add(error.getSeverity().name());
                        scores.add((long) error.getScore());
                        categories.add(error.getCategory());
                        errors.add(DataQualitySerializer.errorToString(error));
                        totalScore += error.getScore();
                    }

                    fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_FUNCTION_NAME.getName(), functions));
                    fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_MESSAGE.getName(), messages));
                    fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_SEVERITY.getName(), severities));
                    fields.add(IndexingField.ofIntegers(RecordsDataQualityHeaderField.FIELD_SCORE.getName(), scores));
                    fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_CATEGORY.getName(), categories));

                    // Errors as JSON objects
                    fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_ERROR.getName(), errors));

                    // Total score
                    fields.add(IndexingField.of(RecordsDataQualityHeaderField.FIELD_TOTAL_SCORE.getName(), totalScore));
                }

                // Spots.
                fields.add(IndexingField.ofStrings(RecordsDataQualityHeaderField.FIELD_SPOT.getName(), rer.getSpots().stream()
                        .map(DataQualitySpot::getPath)
                        .collect(Collectors.toList())));

                rules.add(IndexingRecord.of(fields));
            }));

        if (CollectionUtils.isNotEmpty(rules)) {

            RecordIndexId id = RecordIndexId.of(
                    keys.getEntityName(),
                    keys.getEtalonKey().getId(),
                    periodIdAsString);

            IndexRequestContext irc = IndexRequestContext.builder()
                    .drop(false)
                    .entity(keys.getEntityName())
                    .update(new Indexing(EntityIndexType.RECORD, id)
                        .withFields(IndexingField.ofRecords(RecordsDataQualityHeaderField.FIELD_QUALITY_ERRORS.getName(), rules)))
                    .routing(id.getRouting())
                    .refresh(!ctx.isBatchOperation() && refreshImmediate.getValue())
                    .build();

            cs.addIndexRequestContext(irc);
        }
    }
    /*
     * Process data updates
     */
    private void processData(RecordQualityContext ctx, DataQualityResult dqr,
            RecordKeys keys,
            String periodIdAsString, TimeInterval<OriginRecord> ti, NamespaceAssignmentElement nae) {

        DataRecord enrichment = dqr.getOutput()
                .getAsIdentity(nae.getNameSpace(), keys.getEntityName())
                .get(periodIdAsString);

        // Has no enrichments.
        if (Objects.isNull(enrichment)) {
            return;
        }

        String adminSourceSystem = metaModelService.instance(Descriptors.SOURCE_SYSTEMS)
            .getAdminElement()
            .getName();

        RecordOriginKey rok = keys.getSupplementaryKeysWithoutEnrichments().stream()
            .filter(ok -> StringUtils.equals(adminSourceSystem, ok.getSourceSystem())
                        && !ok.isEnrichment()
                        && ok.getInitialOwner().equals(UUID.fromString(keys.getEtalonKey().getId())))
            .findFirst()
            .orElse(null);

        Objects.requireNonNull(rok, "System origin key is null.");

        MutableTimeInterval<OriginRecord> mti = ti.unlock();
        CalculableHolder<OriginRecord> prev = mti.peek(rok.toBoxKey());
        EtalonRecord er = ti.getCalculationResult();

        DataRecord diff = DataDiffUtils.diffAsRecord(keys.getEntityName(), enrichment, er,
                Objects.nonNull(prev) ? prev.getValue() : null);

        if (null != diff) {

            Date ts = ctx.localTimestamp();
            OperationType operationType = ctx.operationType();
            String user = SecurityUtils.getCurrentUserName();

            OriginRecordInfoSection is = new OriginRecordInfoSection()
                    .withCreateDate(ts)
                    .withUpdateDate(ts)
                    .withCreatedBy(user)
                    .withUpdatedBy(user)
                    .withShift(DataShift.REVISED)
                    .withStatus(rok.getStatus())
                    .withValidFrom(ti.getValidFrom())
                    .withValidTo(ti.getValidTo())
                    .withMajor(platformConfiguration.getPlatformMajor())
                    .withMinor(platformConfiguration.getPlatformMinor())
                    .withOperationType(operationType == null ? OperationType.DIRECT : operationType)
                    .withRevision(0)
                    .withOriginKey(rok);

            OriginRecord origin = new OriginRecordImpl()
                    .withDataRecord(diff)
                    .withInfoSection(is);

            mti.push(new DataRecordHolder(origin));

            // Recalculate etalon.
            recordComposerComponent.toEtalon(keys, mti, ts, user);
        }
    }
}
